package com.sunxd.batch.config;

import com.sunxd.batch.listenner.BlogReadListener;
import com.sunxd.batch.listenner.BlogWriterListener;
import com.sunxd.batch.listenner.MyJobListener;
import com.sunxd.batch.po.BlogInfo;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.core.launch.support.SimpleJobLauncher;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.transaction.PlatformTransactionManager;

import javax.sql.DataSource;

/**
 * @author: sun.xd
 * @date: 2021-11-01 17:39
 **/

@Configuration
@EnableBatchProcessing
class MyBatchConfig {

    @Autowired
    private JobRepository jobRepository;

    @Bean
    @Primary
    public SimpleJobLauncher simpleJobLauncher(DataSource dataSource, PlatformTransactionManager transactionManager) throws Exception {
        SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
        jobLauncher.setJobRepository(jobRepository);
        return jobLauncher;
    }


    @Bean
    public Job myJob(JobBuilderFactory jobs, Step myStep){
        return jobs.get("myJob")
                .incrementer(new RunIdIncrementer())
                .flow(myStep)
                .end()
                .listener(new MyJobListener())
                .build();
    }

    @Bean
    public Step myStep(StepBuilderFactory stepBuilderFactory, ItemReader<BlogInfo> blogReader,
                       ItemWriter<BlogInfo> blogWriter, ItemProcessor<BlogInfo, BlogInfo> blogProcessor){
        return stepBuilderFactory
                .get("myStep")
                .<BlogInfo, BlogInfo>chunk(3) // Chunk的机制(即每次读取一条数据，再处理一条数据，累积到一定数量后再一次性交给writer进行写入操作)
                .reader(blogReader).faultTolerant().retryLimit(3).retry(Exception.class).skip(Exception.class).skipLimit(2)
                .listener(new BlogReadListener())
                .processor(blogProcessor)
                .writer(blogWriter).faultTolerant().skip(Exception.class).skipLimit(2)
                .listener(new BlogWriterListener())
                .build();
    }


}
